package com.xb.chain.netty.message.handle;


import cn.hutool.core.util.ObjectUtil;
import cn.hutool.json.JSONUtil;
import com.xb.chain.block.AbstractBlockChain;
import com.xb.chain.constant.InfoConstant;
import com.xb.chain.constant.MessageType;
import com.xb.chain.entity.Block;
import com.xb.chain.entity.Result;
import com.xb.chain.netty.message.entity.ConnectionMessage;
import com.xb.chain.netty.message.entity.GetChainHeightMessage;
import com.xb.chain.netty.message.entity.Message;
import com.xb.chain.netty.message.entity.SyncNodeMessage;
import com.xb.chain.netty.server.NodeServerStart;
import com.xb.chain.utils.BlockUtils;
import com.xb.chain.utils.MessageProxyUtils;
import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j
public class MessageHandle {
    public static LinkedBlockingDeque<Message> linkedHashSet = new LinkedBlockingDeque<>();


    public static void queueStart(){
        for(;;){
            try {
                Message take = linkedHashSet.take();
                handleMessage(take);
            } catch (InterruptedException e) {
                log.error("消息处理队列出现异常：{}",e);
            }
        }
    }
    private static void handleMessage(Message message){
        Integer messageType = message.getMessageType();
        switch (messageType){
            // 普通消息
            case MessageType.STRING_MESSAGE:{
                log.info("普通消息:" + message.getMessage());
                break;
            }
            // 新连接信息
            case MessageType.NEW_CONNECTION: {
                ConnectionMessage connectionMessage = MessageProxyUtils.converToBean(message,  ConnectionMessage.class);
                //　创建客户端进行连接
                connectionMessage.serverConnection();
                log.info("有新节点加入");
                break;
            }
            // 同步节点
            case MessageType.SYNC_BLOCK_INFO: {
                SyncNodeMessage syncNodeMessage = MessageProxyUtils.converToBean(message, SyncNodeMessage.class);
                Channel channel = InfoConstant.getClientChannel(syncNodeMessage.getIp(), syncNodeMessage.getPort());
                if (!ObjectUtil.isEmpty(channel)) {
                    // 判断区块链是否正确
                    Result result = AbstractBlockChain.validataBlockChain();
                    if (result.getCode() == 0) {
                        syncNodeMessage.setIp(NodeServerStart.blockChainProperties.getServerIp());
                        syncNodeMessage.setList(AbstractBlockChain.list);
                        syncNodeMessage.setPort(NodeServerStart.blockChainProperties.getPort());
                        Message<SyncNodeMessage> syncNodeMessageMessage = Message.createMessage(MessageType.ACK_BLOCK_INFO, syncNodeMessage);
                        channel.writeAndFlush(JSONUtil.toJsonStr(syncNodeMessageMessage) + "\r\n");
                    } else {
                        log.error("节点区块链出现异常,输出：{}", result.getMessage());
                    }

                }
                break;
            }
            // 同步节点返回来的信息
            case MessageType.ACK_BLOCK_INFO: {
                SyncNodeMessage syncNodeMessage = MessageProxyUtils.converToBean(message, SyncNodeMessage.class);
                CopyOnWriteArrayList<Block> list = syncNodeMessage.getList();
                // 需要做处理，list大小为1说明是刚创建的节点，直接赋值，大于1说明，需要同步节点
                if (AbstractBlockChain.list.size() == 1) {
                    AbstractBlockChain.list = list;
                } else {
                    // 加锁
                    ReentrantLock lock = new ReentrantLock();
                    try{
                        lock.lock();
                        Result result = BlockUtils.compareBlockChain(AbstractBlockChain.list, list);
                        if (result.getCode() == 0) {
                            log.info("同步区块链数据成功");
                        } else {
                            log.info("同步区块链数据失败，失败信息：{}", result.getMessage());
                        }
                    }finally {
                        lock.unlock();
                    }
                }

                break;
            }

            // 请求获取区块链的高度
            case MessageType.GET_CHAIN_HEIGHT:{
                GetChainHeightMessage getChainHeightMessage = MessageProxyUtils.converToBean(message, GetChainHeightMessage.class);
                Integer size = AbstractBlockChain.list.size();
                String ip = getChainHeightMessage.getIp();
                Integer port = getChainHeightMessage.getPort();
                getChainHeightMessage.setHeight(size);
                // 设置本节点的IP和Port
                getChainHeightMessage.setIp(NodeServerStart.blockChainProperties.getServerIp());
                getChainHeightMessage.setPort(NodeServerStart.blockChainProperties.getPort());
                ClientToServerMessage.basisIpAndPortSendMessage(ip,port,Message.createMessage(MessageType.RETURN_CHAIN_HEIGHT,getChainHeightMessage));
                break;
            }

            // 节点返回的区块链高度信息
            case MessageType.RETURN_CHAIN_HEIGHT: {
                GetChainHeightMessage getChainHeightMessage = MessageProxyUtils.converToBean(message, GetChainHeightMessage.class);
                log.info("ip:{},port:{},height:{}", getChainHeightMessage.getIp(), getChainHeightMessage.getPort(), getChainHeightMessage.getHeight());
                // 判断如果节点高度大于现在节点高度，就需要同步
                if (getChainHeightMessage.getHeight() > AbstractBlockChain.list.size()) {
                    // 告诉对方是本节点需要同步
                    SyncNodeMessage syncNodeMessage = SyncNodeMessage.builder().ip(NodeServerStart.blockChainProperties.getServerIp()).port(NodeServerStart.blockChainProperties.getPort()).build();
                    Message<SyncNodeMessage> syncMessage = Message.createMessage(MessageType.SYNC_BLOCK_INFO, syncNodeMessage);
                    // ip为对方同步的节点
                    ClientToServerMessage.basisIpAndPortSendMessage(getChainHeightMessage.getIp(), getChainHeightMessage.getPort(), syncMessage);
                }
                break;
            }
        }


    }



}
